 1.Spark Streaming之DStream转换操作
   
   DStream上的操作与RDD的类似，分为 Transformations（转换）和 Output
Operations（输出）两种，此外转换操作中还有一些比较特殊的方法，如：
updateStateByKey、transform 以及各种 Window 相关的操作。
   
   Transformation                           Meaning
   map(func)                       将源DStream中的每个元素通过一个
                                   函数func从而得到新的DStreams
   flatMap(func)                   和map类似，但是每个输入
                                   的项可以被映射为0或更多项
   filter(func)                    选择源DStream中函数func判
                                   为true的记录作为新DStreams
   repartition(numPartitions)      通过创建更多或者更少的partition
                                   来改变此DStream的并行级别
   union(otherStream)              联合源DStreams和其他
                                   DStreams来得到新DStream
   count()                         统计源DStreams中每个RDD所含元素
                                   的个数得到单元素RDD的新DStreams
   reduce(func)                    通过函数func(两个参数一个输出)来整合源
                                   DStreams中每个RDD元素得到单元素RDD的
                                   DStreams。这个函数需要关联从而可以被并行计算
   countByValue()                  对于DStreams中元素类型为K调用此函数，得到
                                   包含(K,Long)对的新DStream，其中Long值表
                                   明相应的K在源DStream中每个RDD出现的频率
   reduceByKey(func,[numTasks])    对(K,V)对的DStream调用此函数，返回
                                   同样(K,V)的新DStream，新DStream中
                                   的对应V为使用reduce函数整合而来。默认
                                   情况下，这个操作使用Spark默认数量
                                   的并行任务(本地模式为2,集群模式中
								   的数量取决于配置参数spark.default
                                   .parallelism)。也可以传入可选的
                                   参数numTasks来设置不同数量的任务
   join(otherStream,[numTasks])    两DStream分别为(K,V)和(K,W)对，
                                   返回(K,(V,W))对的新DStream
   cogroup(otherStream,[numTasks]) 两DStream分别为(K,V)和(K,W)对，返
                                   回(K,(Seq[V],Seq[W])对新DStreams
   transform(func)                 将RDD到RDD映射的函数func作用于源
                                   DStream中每个RDD上得到新DStream。这
                                   个可用于在DStream的RDD上做任意操作 
   
   备注：
        在DStream与RDD上的转换操作非常类似（无状态的操作）
        DStream有自己特殊的操作（窗口操作、追踪状态变化操作）
        在DStream上的转换操作比RDD上的转换操作少
   DStream 的转化操作可以分为 无状态(stateless) 和 有状态(stateful) 两种：
        无状态转化操作。每个批次的处理不依赖于之前批次的数据。常见的 RDD 转化
操作，例如 map、filter、reduceByKey 等
        有状态转化操作。需要使用之前批次的数据 或者是 中间结果来计算当前批次的
数据。有状态转化操作包括：基于滑动窗口的转化操作 或 追踪状态变化的转化操作
 
 2.无状态转换
   
   无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上，也就是转化DStream
中的每一个 RDD。
   常见的无状态转换包括：map、flatMap、filter、repartition、reduceByKey、
groupByKey；直接作用在DStream上
   重要的转换操作：transform。通过对源DStream的每个RDD应用RDD-to-RDD函数，创建
一个新的DStream。支持在新的DStream中做任何RDD操作。
   这是一个功能强大的函数，它可以允许开发者直接操作其内部的RDD。也就是说开发者，
可以提供任意一个RDD到RDD的函数，这个函数在数据流每个批次中都被调用，生成一个新
的流。
   示例：黑名单过滤
   假设：arr1为黑名单数据(自定义)，true表示数据生效，需要被过滤掉；false表示数据
未生效
val arr1 = Array(("spark", true), ("scala", false))
   
   假设：流式数据格式为"time word"，需要根据黑名单中的数据对流式数据执行过滤操
作。如"2 spark"要被过滤掉
1 hadoop
2 spark
3 scala
4 java
5 hive
   
   结果："2 spark" 被过滤
   方法一：使用外连接
package cn.lagou.Streaming.basic

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{ConstantInputDStream}

// transform
// ConstantInputDStream主要用于测试
object BlackListFilter1 {
  def main(args: Array[String]): Unit = {
    // 初始化
    Logger.getLogger("org").setLevel(Level.WARN)
    val conf: SparkConf = new SparkConf()
      .setAppName(this.getClass.getCanonicalName)
      .setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(10))
    ssc.sparkContext.setLogLevel("warn")

    // 自定义黑名单数据
    val blackList = Array(("spark", true), ("scala", false), ("hello", true), ("World", true))
      .map(elem => (elem._1.toLowerCase, elem._2))
    val blackListRDD: RDD[(String, Boolean)] = ssc.sparkContext.makeRDD(blackList)

    // 创建DStream, 使用 ConstantInputDStream用于测试
    val strArray: Array[String] =
      "Hello World Hello Hadoop Hello spark kafka hive zookeeper hbase flume sqoop scala"
        .split("\\s+")
        .zipWithIndex
        .map { case (word, timestamp) => s"$timestamp $word" }
    val RDD: RDD[String] = ssc.sparkContext.makeRDD(strArray)
    val wordStream: ConstantInputDStream[String] =
      new ConstantInputDStream[String](ssc, RDD)

    // 流式数据的处理
    wordStream.transform{ rdd =>
      rdd.map{line => (line.split("\\s+")(1).toLowerCase, line)}
        .leftOuterJoin(blackListRDD)
        .filter{case (_, (_, rightValue)) => !rightValue.getOrElse(false)}
        .map{case (_, (leftValue, _)) => leftValue}
    }.print(20)
    // 流式数据的输出
    // 启动作业
    ssc.start()
    ssc.awaitTermination()
  }
}
   
   方法二：使用SQL
package cn.lagou.Streaming.basic

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

// transform
// ConstantInputDStream主要用于测试
object BlackListFilter2 {
  def main(args: Array[String]): Unit = {
    // 初始化
    Logger.getLogger("org").setLevel(Level.WARN)
    val conf: SparkConf = new SparkConf()
      .setAppName(this.getClass.getCanonicalName)
      .setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(2))
    ssc.sparkContext.setLogLevel("warn")

    // 自定义黑名单数据
    val blackList = Array(("spark", true), ("scala", false), ("hello", true), ("World", true))
      .map(elem => (elem._1.toLowerCase, elem._2))
    val blackListRDD: RDD[(String, Boolean)] = 
      ssc.sparkContext.makeRDD(blackList)

    // 创建DStream, 使用 ConstantInputDStream用于测试
    val strArray: Array[String] =
      "Hello World Hello Hadoop Hello spark kafka hive zookeeper hbase flume sqoop scala"
        .split("\\s+")
        .zipWithIndex
        .map { case (word, timestamp) => s"$timestamp $word" }
    val RDD: RDD[String] = ssc.sparkContext.makeRDD(strArray)
    val wordStream: ConstantInputDStream[String] =
      new ConstantInputDStream[String](ssc, RDD)

    // 流式数据的处理和输出,可以使用SQL
    wordStream.map(line => (line.split("\\s+")(1).toLowerCase, line))
        .transform{rdd =>
          val spark = SparkSession.builder()
            .config(rdd.sparkContext.getConf)
            .getOrCreate()
          import spark.implicits._
          val wordDF: DataFrame = rdd.toDF("word", "line")
          val blackListDF: DataFrame = blackListRDD.toDF("word", "flag")
          wordDF.join(blackListDF, Seq("word"), "left_outer")
            .filter("flag is null or flag = false")
            .select("line")
            .rdd
        }.print(20)

    // 流式数据的输出
    // 启动作业
    ssc.start()
    ssc.awaitTermination()
  }
}
   
   方法三：直接过滤
package cn.lagou.Streaming.basic

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object BlackListFilter3 {
  def main(args: Array[String]): Unit = {
    // 初始化
    Logger.getLogger("org").setLevel(Level.WARN)
    val conf: SparkConf = new SparkConf()
      .setAppName(this.getClass.getCanonicalName)
      .setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(2))
    ssc.sparkContext.setLogLevel("warn")

    // 自定义黑名单数据
    val blackList = Array(("spark", true), ("scala", false), ("hello", true), ("World", true))
      .filter(_._2)
      .map(_._1.toLowerCase)


    // 创建DStream, 使用 ConstantInputDStream用于测试
    val strArray: Array[String] =
      "Hello World Hello Hadoop Hello spark kafka hive zookeeper hbase flume sqoop scala"
        .split("\\s+")
        .zipWithIndex
        .map { case (word, timestamp) => s"$timestamp $word" }
    val RDD: RDD[String] = ssc.sparkContext.makeRDD(strArray)
    val wordStream: ConstantInputDStream[String] =
      new ConstantInputDStream[String](ssc, RDD)

    // 流式数据的处理和输出
    wordStream.map(line => (line.split("\\s+")(1).toLowerCase, line))
        .filter{case (word, _) => !blackList.contains(word)}
        .map(_._2)
        .print(20)

    // 流式数据的输出
    // 启动作业
    ssc.start()
    ssc.awaitTermination()
  }
}

 3.有状态转换
   
   有状态的转换主要有两种：窗口操作、状态跟踪操作
   1).窗口操作
   Window Operations可以设置窗口大小和滑动窗口间隔来动态的获取当前Streaming
的状态。
   基于窗口的操作会在一个比 StreamingContext 的 batchDuration（批次间隔）更长
的时间范围内，通过整合多个批次的结果，计算出整个窗口的结果。
   基于窗口的操作需要两个参数：
       窗口长度(windowDuration)。控制每次计算最近的多少个批次的数据
       滑动间隔(slideDuration)。用来控制对新的 DStream 进行计算的间隔
   两者都必须是 StreamingContext 中批次间隔(batchDuration)的整数倍。
   
   每秒发送1个数字：
package cn.lagou.Streaming.basic

import java.io.PrintWriter
import java.net.{ServerSocket, Socket}

// 每秒发送1个数字：从1开始
object SocketLikeNCWithWindow {
  def main(args: Array[String]): Unit = {
    val port: Int = 9999

    val server = new ServerSocket(port)
    val socket: Socket = server.accept()
    println("成功连接到本地主机：" + socket.getInetAddress)
    var i = 0
    while (true) {
      i += 1
      val out = new PrintWriter(socket.getOutputStream)
      out.println(i)
      out.flush()
      Thread.sleep(1000)
    }
  }
}
   
   案例一：
       观察窗口的数据；
       观察 batchDuration、windowDuration、slideDuration 三者之间的关系；
       使用窗口相关的操作；
package cn.lagou.Streaming.basic

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream

object WindowDemo {
  def main(args: Array[String]): Unit = {
    // 初始化
    Logger.getLogger("org").setLevel(Level.WARN)
    val conf: SparkConf = new SparkConf().setAppName(this.getClass.getCanonicalName)
      .setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("warn")

    // 创建DStream
    val lines: DStream[String] = ssc.socketTextStream("localhost", 9999)

    // DStream转换
    // foreachRDD输出
    lines.foreachRDD{(rdd, time) =>
      println(s"rdd = ${rdd.id}; time =$time")
      rdd.foreach(println)
    }

    // 窗口操作
    val res1: DStream[String] = 
      lines.reduceByWindow(_ + " " + _, Seconds(20), Seconds(10))
    res1.print()

    val res2: DStream[String] = lines.window(Seconds(20), Seconds(10))
    res2.print()

    val res3: DStream[Int] = res2.map(_.toInt).reduce(_ + _)
    res3.print()

    val res4: DStream[Int] =
      lines.map(_.toInt).reduceByWindow(_ + _, Seconds(20), Seconds(10))
    res4.print()

    // 启动作业
    ssc.start()
    ssc.awaitTermination()
  }
}
   
   案例二：热点搜索词实时统计。每隔 10 秒，统计最近20秒的词出现的次数
package cn.lagou.Streaming.basic

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream

object HotWordStats {
  def main(args: Array[String]): Unit = {
    // 初始化
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf: SparkConf = new SparkConf().setAppName(this.getClass.getCanonicalName)
      .setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(5))
    // 设置检查点，保存状态，在生产中目录设置到hdfs
    ssc.checkpoint("data/checkpoint")

    // 创建DStream
    val lines: DStream[String] = ssc.socketTextStream("localhost", 9999)

    // DStream转换&输出
    // 每隔 10 秒，统计最近20秒的词出现的次数
    // window1= t1 + t2 + t3
    // window2= t3 + t4 + t5
    val wordCounts1: DStream[(String, Int)] = lines.flatMap(_.split("\\s+"))
      .map((_, 1))
      .reduceByKeyAndWindow((x: Int, y: Int) => x+y, Seconds(20), Seconds(10))
    wordCounts1.print()

    // window2= w1 - t1 -t2 + t4 + t5
    // 需要checkpoint的支持
    val wordCounts2: DStream[(String, Int)] = lines.flatMap(_.split("\\s+"))
      .map((_, 1))
      .reduceByKeyAndWindow(_+_, _-_, Seconds(20), Seconds(10))
    wordCounts2.print()


    // 启动作业
    ssc.start()
    ssc.awaitTermination()
  }
}

   2).updateStateByKey(状态追踪操作)
   UpdateStateByKey的主要功能：
      为Streaming中每一个Key维护一份state状态，state类型可以是任意类型的，可以
是自定义对象；更新函数也可以是自定义的
      通过更新函数对该key的状态不断更新，对于每个新的batch而言，Spark Streaming
会在使用updateStateByKey 的时候为已经存在的key进行state的状态更新
      使用 updateStateByKey 时要开启 checkpoint 功能
   
   流式程序启动后计算wordcount的累计值，将每个批次的结果保存到文件
package cn.lagou.Streaming.basic

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream

object StateTracker1 {
  def main(args: Array[String]): Unit = {
    // 初始化
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf: SparkConf = new SparkConf()
      .setAppName(this.getClass.getCanonicalName)
      .setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(2))
    ssc.checkpoint("data/checkpoint/")

    // 创建DStream
    val lines: DStream[String] = ssc.socketTextStream("localhost", 9999)

    // DStream转换
    val pairsDStream: DStream[(String, Int)] = lines.flatMap(_.split("\\s+"))
      .map((_, 1))

    // updataFunc: (Seq[V], Option[S]) =>Option[S]
    val updataFunc: (Seq[Int], Option[Int]) =>
      Some[Int] = (currValues: Seq[Int], prevValue: Option[Int]) => {
      val currentSum = currValues.sum
      val prevSum: Int = prevValue.getOrElse(0)
      Some(currentSum + prevSum)
    }


    val resultDStream: DStream[(String, Int)] =
      pairsDStream.updateStateByKey[Int](updataFunc)
    resultDStream.cache()

    // DStream输出操作
    resultDStream.print()
    resultDStream.saveAsTextFiles("data/output1/")


    // 启动作业
    ssc.start()
    ssc.awaitTermination()
  }
}
   
   统计全局的key的状态，但是就算没有数据输入，也会在每一个批次的时候返回之前
的key的状态。
   这样的缺点：如果数据量很大的话，checkpoint 数据会占用较大的存储，而且效率
也不高。
   mapWithState：也是用于全局统计key的状态。如果没有数据输入，便不会返回之前
的key的状态，有一点增量的感觉。
   这样做的好处是，只关心那些已经发生的变化的key，对于没有数据输入，则不会返
回那些没有变化的key的数据。即使数据量很大，checkpoint也不会像updateStateByKey
那样，占用太多的存储。
package cn.lagou.Streaming.basic

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}

object StateTracker2 {
  def main(args: Array[String]): Unit = {
    // 初始化
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf: SparkConf = new SparkConf()
      .setAppName(this.getClass.getCanonicalName)
      .setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(2))
    ssc.checkpoint("data/checkpoint/")

    // 创建DStream
    val lines: DStream[String] = ssc.socketTextStream("localhost", 9999)

    // DStream转换
    val pairsDStream: DStream[(String, Int)] = 
      lines.flatMap(_.split("\\s+"))
      .map((_, 1))

    // 函数返回的类型即为 mapWithState 的返回类型
    // (KeyType, Option[ValueType], State[StateType]) => MappedType
    def mappingFunction(key: String, one: Option[Int], state: State[Int]): (String, Int) = {
      // 计算value
      val sum: Int = one.getOrElse(0) + state.getOption().getOrElse(0)
      // 保存状态
      state.update(sum)
      // 输出值
      (key, sum)
    }
    val spec = StateSpec.function(mappingFunction _)
    val resultDStream: DStream[(String, Int)] =
      pairsDStream.mapWithState[Int, (String, Int)](spec)
      // 显示快照(显示所有相关的key-value)
          .stateSnapshots()
    resultDStream.cache()

    // DStream输出操作
    resultDStream.print()
    resultDStream.saveAsTextFiles("data/output2/")


    // 启动作业
    ssc.start()
    ssc.awaitTermination()
  }
}

   
   